Data Exchange
Example Process for Automating Data Upload
The following Python scripts illustrate how a process can be setup to check an upload folder for new uploads and manage the upload via Data Exchange, including checks for upload errors, using the API endpoint POST /api/v2/DataExchangeJob.
- """
- Example script to automate data exchange (Assetic.DataExchangeProcessNew.py)
- First of two scripts
- This script uploads to Assetic the files in the 'ToLoad' folder. The
- filename is parsed to get the asset category which allows the data exchange
- profile to be determined. A data exchange task is created for the document
- The name of the file is changed to the task id and moved to the 'InProgress'
- folder. This allows the subsequent script to identify the task and check
- progress
- """
- import assetic
- import os
- import shutil
- import sys
- import base64
- # Assetic SDK instance
- asseticsdk = assetic.AsseticSDK("C:/Users/kwilton/assetic.ini",None,"Info")
- ##API instances
- # Document API
- docapi = assetic.DocumentApi()
- # Data Exchange API
- dataexchangeapi = assetic.DataExchangeJobApi()
- # File Processing Directory structure
- intdir = "C:/temp/Integration/"
- sourcedir = intdir + "ToLoad/"
- inprogress = intdir + "InProgress/"
- errordir = intdir + "Error/"
- processeddir = intdir + "Processed/"
- ##preview
- profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
- 'Roads': 'd889d99b-3317-e611-9458-06edd62954d7'}
- ##demo
- profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7',
- 'Roads': '876a0a4e-c732-e611-945f-06edd62954d7',
- 'Water Pressure Pipes': '3abd431c-2017-e611-9812-0632cf3be881'}
- ##Loop through files in folder
- files = os.listdir(sourcedir)
- for filename in files:
- fullfilename = sourcedir + filename
- #get category, assumes file name has a whitespace between date and category
- filedate, categorywithext = filename.split(None,1)
- category, fileext = os.path.splitext(categorywithext)
- asseticsdk.logger.info("File Date: {0}, Category: {1}".format(
- filedate, category))
- ##read file and encode for upload
- with open(fullfilename, "rb") as f:
- data = f.read()
- if sys.version_info < (3,0):
- filecontents = data.encode("base64")
- else:
- filecontents = base64.b64encode(data)
- filecontents = filecontents.decode(encoding="utf-8", errors="strict")
- ##create file properties object and set values
- file_properties = \
- assetic.Assetic3IntegrationRepresentationsFilePropertiesRepresentation()
- file_properties.name = filename
- file_properties.mimetype = 'csv'
- file_properties.filecontent = filecontents
- filearray = [file_properties]
- ##create document object and assign values, including file properties
- document = \
- assetic.Assetic3IntegrationRepresentationsDocumentRepresentation()
- document.label = 'Data Exchange - ' + filename
- document.file_extension = 'csv'
- document.document_type = 'DataExchange'
- document.mime_type = 'csv'
- document.doc_group_id = 1
- document.file_property = filearray
- #Upload document and get ID from response
- doc = docapi.document_post(document)
- docid = doc[0].get('Id')
- #get dataexchange profile for category
- profileid = profiles.get(category)
- ##prepare data exchange parameters
- job = assetic.Assetic3IntegrationRepresentationsDataExchangeJobRepresentation()
- job.profile_id = profileid
- job.document_id = docid
- asseticsdk.logger.info("Profile ID:{0}".format(job.profile_id))
- asseticsdk.logger.info("Job Document ID:{0}".format(job.document_id))
- try:
- task = dataexchangeapi.data_exchange_job_post(job)
- #move file to in-progress, and rename to task id to allow checks
- #on success/failure
- destination = inprogress + task + '.csv'
- shutil.move(fullfilename,destination)
- except assetic.rest.ApiException as e:
- #gross error - move to error folder??
- asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
- e.status,e.reason))
- destination = errordir + task + '.csv'
- shutil.move(fullfilename,destination)
How it works
The folder locations are defined in the script. It assumes that an external process is populating the source folder with files that are to be uploaded.
##File Processing Directory structure intdir = "C:/temp/Integration/" sourcedir = intdir + "ToLoad/" inprogress = intdir + "InProgress/" errordir = intdir + "Error/" processeddir = intdir + "Processed/"
Each file in the source folder will be processed one at a time
files = os.listdir(sourcedir) for filename in files:
In this example each Asset Category will have a separate Date Exchange profile, so the profile ID for each category is defined
##List of categories and associated dataexchange profile profiles = {'Sewer Nodes': 'c7e6d3ed-3917-e611-9458-06edd62954d7', 'Roads': '876a0a4e-c732-e611-945f-06edd62954d7', 'Water Pressure Pipes': '3abd431c-2017-e611-9812-0632cf3be881'}
The filename is parsed to determine the category and hence the profile ID
#get category, assumes file name has a whitespace between date and category filedate, categorywithext = filename.split(None,1) category, fileext = os.path.splitext(categorywithext)
Get the dataexchange profile for the category
profileid = profiles.get(category)
The document is uploaded and the document ID obtained
#Upload document and get ID from response doc = docapi.document_post(document) docid = doc[0].get('Id')
After preparing the job information the job is posted to Data Exchange. If there is an exception returned it means there was a problem with creating the job. Perhaps the profile ID is incorrect or there are insufficient priviledges to create the job.
If there is no exception the Data Exchange task ID is returned. Since tasks are queued it may not run immediately. To allow the task to be checked at a later stage the file is renamed to the task ID and moved to the 'inprogress' folder.
try: task = dataexchangeapi.data_exchange_job_post(job) #move file to in-progress, and rename to task id to allow checks #on success/failure destination = inprogress + task + '.csv' shutil.move(fullfilename,destination) except assetic.rest.ApiException as e: #gross error - move to error folder destination = errordir + task + '.csv' shutil.move(fullfilename,destination) asseticsdk.logger.error('Status {0}, Reason: {1}'.format(e.status,e.reason))
Checking if a task has processed
The following Python script uses the file name to check to see if the data exchange task has run. If it has run successfully then the file is moved to the 'Processed' folder, otherwise it is moved to the 'Error' folder. This script is scheduled to run periodically.
- """
- Example script to automate data exchange (Assetic.DataExchangeCheckTask.py)
- Second of two scripts
- This script checks the 'InProcess' folder to see if task has beeen
- processed in data exchange. The names of the files in this folder have been
- changed from the original name to the data exchange task id
- If a task has been processed then it moves the file to either the
- 'Processed' folder or the 'Error' folder. If there is an error
- the error file generated by data exchange is also downloaded and
- placed in the error folder
- """
- import assetic
- import os
- import shutil
- import sys
- ##Assetic SDK instance
- asseticsdk = assetic.AsseticSDK("c:/users/you/assetic.ini",None,"Info")
- ##create an instance of the DataExchange API
- dataexchangeapi = assetic.DataExchangeTaskApi()
- ##Create an instance for document get.
- ##Use sdk client for document get. Returns header 'content-disposition'
- ##The header has the file name of the doc
- #Document API
- docapi = assetic.DocumentApi(asseticsdk.client_for_docs)
- # Define file path for output report files
- intdir = "C:/temp/Integration/"
- sourcedir = intdir + "ToLoad/"
- inprogressdir = intdir + "InProgress/"
- errordir = intdir + "Error/"
- processeddir = intdir + "Processed/"
- ##define function for getting error file
- def getfile(docid,docapi,filedir):
- try:
- getfile = docapi.document_get_document_file(docid)
- except assetic.rest.ApiException as e:
- asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
- e.status,e.reason))
- if getfile != None:
- if 'attachment' in getfile[1] and 'filename=' in getfile[1]:
- filename = getfile[1].split('filename=',1)[1]
- if '"' in filename or "'" in filename:
- filename = filename[1:-1]
- fullfilename = filedir + filename
- else:
- fullfilename = filedir +filename
- data = getfile[0]
- if sys.version_info >= (3,0):
- try:
- data = data.encode('utf-8')
- except:
- data = data
- with open( fullfilename, 'wb' ) as out_file:
- out_file.write(data)
- asseticsdk.logger.info('Created file: {0}'.format(fullfilename))
- ##end function
- files = os.listdir(inprogressdir)
- for filename in files:
- fullfilename = inprogressdir + filename
- taskguid, fileext = os.path.splitext(filename)
- try:
- task = dataexchangeapi.data_exchange_task_get(taskguid)
- except assetic.rest.ApiException as e:
- asseticsdk.logger.error('Status {0}, Reason: {1}'.format(
- e.status,e.reason))
- else:
- status = task.get('StatusDescription')
- if status == 'Complete with error':
- #move file to error
- destination = errordir + filename
- shutil.move(fullfilename,destination)
- ##also get error file
- errordocid = task.get('ErrorDocumentId')
- errordesc = task.get('Summary')
- if errordocid == None:
- asseticsdk.logger.info("Error Description: {0}".format(
- errordesc))
- else:
- asseticsdk.logger.info("Error document: {0}".format(errordocid))
- errordirtask = errordir + taskguid
- getfile(errordocid,docapi,errordirtask)
- elif status == 'Completed':
- #move file to Completed
- destination = processeddir + filename
- shutil.move(fullfilename,destination)
How it works
The script loops through files in the inprogress folder and obtains the Data Exchange taskid using the file name of the file. This script is schedule to run periodically.
files = os.listdir(inprogressdir) for filename in files: fullfilename = inprogressdir + filename taskguid, fileext = os.path.splitext(filename)
The task is requested and status of the response checked
try: task = dataexchangeapi.data_exchange_task_get(taskguid) except assetic.rest.ApiException as e: asseticsdk.logger.error('Status {0}, Reason: {1}'.format(e.status,e.reason)) else: status = task.get('StatusDescription')
If the status is 'Complete with Error' the file is moved to the error folder. If there error is due to an error with field mapping the the 'Summary' describes the problem.
If there is a data error then an error file is generated by Data Exchange. This file is downloaded and saved in the 'error' folder along with the document. The function 'getfile' is used to get the file.
#move file to error destination = errordir + filename shutil.move(fullfilename,destination) ##also get error file errordocid = task.get('ErrorDocumentId') errordesc = task.get('Summary') if errordocid == None: asseticsdk.logger.info(errordesc) else: asseticsdk.logger.info(errordocid) errordirtask = errordir + taskguid getfile(errordocid,docapi,errordirtask)
The function 'getfile' is defined in the script body. Defining functions within the script can make it easier to read the script and perform repetitive tasks
##define function for getting error file def getfile(docid,docapi,filedir): getfile = docapi.document_get_document_file(docid) if getfile != None: if 'attachment' in getfile[1] and 'filename=' in getfile[1]: filename = getfile[1].split('filename=',1)[1] if '"' in filename or "'" in filename: filename = filename[1:-1] fullfilename = filedir + filename else: fullfilename = filedir +filename data = getfile[0] if sys.version_info >= (3,0): try: data = data.encode('utf-8') except: data = data with open( fullfilename, 'wb' ) as out_file: out_file.write(data) asseticsdk.logger.error ('Created file: {0}'.format(fullfilename)) ##end function
If the status is 'completed' it means the Data Exchange task has completed without error. The file is moved to the completed folder and no further processing on that file takes place.
elif status == 'Completed': #move file to Completed destination = processeddir + filename shutil.move(fullfilename,destination)
If the task has not been processed then the file remains in the 'inprogress' folder.